reduce()

Reduction applies an operator to a vector repeatedly to return a number.


In [1]:
[reduce(+, 1:8) sum(1:8)]


Out[1]:
1x2 Array{Int64,2}:
 36  36

In [2]:
[reduce(*, 1:8) prod(1:8)]


Out[2]:
1x2 Array{Int64,2}:
 40320  40320

In [3]:
boring(a,b)=a
@show reduce(boring, 1:8)
boring2(a,b)=b
@show reduce(boring2, 1:8)


reduce(boring,1:8) => 1
reduce(boring2,1:8) => 8
Out[3]:
8

You can also use reduce to compute Fibonacci numbers using their recurrences.

$$\begin{pmatrix} f_2 \\ f_1 \end{pmatrix} = \begin{pmatrix} f_1 + f_0 \\ f_1 \end{pmatrix} = \begin{pmatrix} 1 & 1 \\ 1 & 0 \end{pmatrix} \begin{pmatrix} f_1 \\ f_0 \end{pmatrix} $$$$\begin{pmatrix} f_{n+1} \\ f_n \end{pmatrix} = \dots = \begin{pmatrix} 1 & 1 \\ 1 & 0 \end{pmatrix}^n \begin{pmatrix} f_1 \\ f_0 \end{pmatrix} $$

From this, you can show that

$$\begin{pmatrix} 1 & 1 \\ 1 & 0 \end{pmatrix}^n = \begin{pmatrix} f_{n+1} & f_n \\ f_n & f_{n-1} \end{pmatrix} $$

In [4]:
fib(j)=reduce(*, [[1 1; 1 0] for i=1:j])
map(fib, [4, 7])


Out[4]:
2-element Array{Array{Int64,2},1}:
 2x2 Array{Int64,2}:
 5  3
 3  2    
 2x2 Array{Int64,2}:
 21  13
 13   8

You can solve recurrences of any complexity using reduce. For example, reduce can compute a Hadamard matrix from its definition in terms of its submatrices:

$$H_2 = \begin{pmatrix} H_1 & H_1 \\ H_1 & -H_1 \end{pmatrix} = \begin{pmatrix} 1 & 1 \\ 1 & -1 \end{pmatrix} \otimes H_1$$

and so on.


In [5]:
Hadamard(n)=reduce(kron, [[1 1; 1 -1] for i=1:n])
Hadamard(3)


Out[5]:
8x8 Array{Int64,2}:
 1   1   1   1   1   1   1   1
 1  -1   1  -1   1  -1   1  -1
 1   1  -1  -1   1   1  -1  -1
 1  -1  -1   1   1  -1  -1   1
 1   1   1   1  -1  -1  -1  -1
 1  -1   1  -1  -1   1  -1   1
 1   1  -1  -1  -1  -1   1   1
 1  -1  -1   1  -1   1   1  -1

In [6]:
ans*ans' #This is a legitimate Hadamard matrix


Out[6]:
8x8 Array{Int64,2}:
 8  0  0  0  0  0  0  0
 0  8  0  0  0  0  0  0
 0  0  8  0  0  0  0  0
 0  0  0  8  0  0  0  0
 0  0  0  0  8  0  0  0
 0  0  0  0  0  8  0  0
 0  0  0  0  0  0  8  0
 0  0  0  0  0  0  0  8

In [7]:
#Reduction of matrix multiplications
M=[randn(2,2) for i=1:4]
printnice(x)=println(round(x,3))
printnice(M[4]*M[3]*M[2]*M[1])
printnice(reduce((A,B)->B*A, M)) #backward multiply
printnice(reduce(*, M))          #forward multiply


-7.087	4.835
2.571	-1.748

-7.087	4.835
2.571	-1.748

.032	-.079
-.892	.958

In the following example we apply reduce() to function composition:


In [8]:
h=reduce((f,g)->(x->f(g(x))), [sin cos tan])
[h(pi) sin(cos(tan(pi)))]


Out[8]:
1x2 Array{Float64,2}:
 0.841471  0.841471

prefix

Having discussed reduce, we are now ready for the idea behind prefix sum.

Suppose you wanted to compute the partial sums of a vector, i.e. given y[1:n], we want to overwrite the vector y with the vector of partial sums

new_y[1] = y[1]
new_y[2] = y[1] + y[2]
new_y[3] = y[1] + y[2] + y[3]
...

At first blush, it seems impossible to parallelize this, since

new_y[1] = y[1]
new_y[2] = new_y[1] + y[2]
new_y[3] = new_y[2] + y[3]
...

which is an intrinsically serial process.


In [9]:
function prefix_serial!(y,+)
    @inbounds for i=2:length(y)
        y[i]=y[i-1]+y[i]
    end
    y
end


Out[9]:
prefix_serial! (generic function with 1 method)

However, it turns out that because addition (+) is associative, we can regroup the order of how these sums are carried out. (This of course extends to other associative operations such as multiplication.) Another ordering of 8 associative operations is provided by prefix8!:


In [10]:
function prefix8!(y, *)
    length(y)==8 || error("length 8 only")
    for i in [2,4,6,8]; y[i]=y[i-1]*y[i]; end
    for i in [  4,  8]; y[i]=y[i-2]*y[i]; end
    for i in [      8]; y[i]=y[i-4]*y[i]; end
    for i in [    6  ]; y[i]=y[i-2]*y[i]; end
    for i in [ 3,5,7 ]; y[i]=y[i-1]*y[i]; end
    y
end

function prefix!(y,.*)
    l=length(y)
    k=int(ceil(log2(l)))
    @inbounds for j=1:k, i=2^j:2^j:min(l, 2^k)              #"reduce"
        y[i]=y[i-2^(j-1)].*y[i]
    end
    @inbounds for j=(k-1):-1:1, i=3*2^(j-1):2^j:min(l, 2^k) #"broadcast"
        y[i]=y[i-2^(j-1)].*y[i]
    end
    y
end


Out[10]:
prefix! (generic function with 1 method)

prefix! rearranges the operations to form two spanning trees:


In [11]:
using Compose
function gate(proc_from, proc_to, depth, num_procs;
              radius_in=0.1/num_procs, radius_gate=0.25/num_procs)
    const k::Int=ceil(log2(num_procs))              #Height of tree
    in1 = (proc_from-0.5)/num_procs, depth/(2k)     #Coordinates of input gate 1
    in2 = (proc_to-0.5)/num_procs, depth/(2k)       #Coordinates of input gate 2
    op  = (proc_to-0.5)/num_procs, (depth+0.5)/(2k) #Coordinates of gate operation
    compose(canvas(),
        compose(canvas(), circle(in1..., radius_in), circle(in2..., radius_in),
                          circle(op..., radius_gate), fill("white")),
        compose(canvas(), lines(in1, op), lines(in2, op), linewidth(0.3mm)))
end

function drawcircuit(num_procs::Int)
    circuit=Compose.CanvasTree[]
    const k::Int=ceil(log2(num_procs))                    #Height of tree
    for j=1:k, i=2^j:2^j:min(num_procs, 2^k)              #Draw "reduce" tree
        push!(circuit, gate(i-2^(j-1), i,    j, num_procs))
    end
    for j=(k-1):-1:1, i=3*2^(j-1):2^j:min(num_procs, 2^k) #Draw "broadcast" tree
        push!(circuit, gate(i-2^(j-1), i, 2k-j, num_procs))
    end
    push!(circuit, compose(canvas(), #Draw vertical lines for each processor
      [lines(((i-0.5)/num_procs,0),((i-0.5)/num_procs,1)) for i=1:num_procs]...,
      linewidth(0.1mm), stroke("grey")))
    compose(canvas(), circuit...)
end
@time drawcircuit(8)


Warning: using Compose.h in module Main conflicts with an existing identifier.
elapsed time: 0.532384071 seconds (12668788 bytes allocated)
WARNING: 
Out[11]:
radians2degrees is deprecated, use rad2deg instead.
 in radians2degrees at deprecated.jl:8
 in draw at /home/jiahao/.julia/Compose/src/svg.jl:320
 in draw at /home/jiahao/.julia/Compose/src/form.jl:326
 in draw at /home/jiahao/.julia/Compose/src/form.jl:157
 in drawpart at /home/jiahao/.julia/Compose/src/canvas.jl:291
 in draw at /home/jiahao/.julia/Compose/src/canvas.jl:242
 in writemime at /home/jiahao/.julia/Compose/src/Compose.jl:503
 in sprint at io.jl:462
 in display_dict at /home/jiahao/.julia/IJulia/src/execute_request.jl:35
 in execute_request_0x535c5df2 at /home/jiahao/.julia/IJulia/src/execute_request.jl:178
 in eventloop at /home/jiahao/.julia/IJulia/src/IJulia.jl:68
 in anonymous at multi.jl:1308

Now let's run prefix in parallel on every core on the CPU. Use addprocs to attach additional worker processes.


In [12]:
@show Sys.CPU_CORES
@show nprocs() #Current number of processes
@show procs()  #List of processes by index

#Add a specified number of processors
num_procs = 8#Sys.CPU_CORES
addprocs(max(0, num_procs-nprocs()))

@show nprocs() #Current number of processes
@show procs()  #List of processes by index

import Base: fetch, length
fetch(t::Vector) = map(fetch, t) #Vectorize fetch

#Define elementary operations on remote data
length(r1::RemoteRef)=length(fetch(r1))
+(r1::RemoteRef,r2::RemoteRef)=@spawnat r2.where fetch(r1)+fetch(r2)
*(r1::RemoteRef,r2::RemoteRef)=@spawnat r2.where fetch(r1)*fetch(r2)


Sys.CPU_CORES => 80
nprocs() => 1
procs() => [1]
nprocs() => 8
procs() => [1,2,3,4,5,6,7,8]
Out[12]:
* (generic function with 133 methods)

In [13]:
#This line of code corresponds to one element of the diagram
# +(r1::RemoteRef,r2::RemoteRef)=@spawnat r2.where fetch(r1)+fetch(r2)
drawcircuit(2)


Out[13]:

For 8 processes, the serial version requires 7 operations. The parallel version uses 11 operations, but they are grouped into 5 simultaneous chunks of operations. Hence we expect that the parallel version runs in 5/7 the time needed for the naïve serial code.


In [15]:
#Before timing, force Julia to JIT compile the functions
for f in (prefix_serial!, prefix!)
    f([randn(3,3) for i=1:2], *)
end

n=2048
@time r=RemoteRef[@spawnat p randn(n,n) for p in procs()] #Create RemoteRefs
s=fetch(r) #These are in-memory matrices
t=copy(r)  #These are RemoteRefs

tic(); prefix_serial!(s, *); t_ser = toc()
tic(); @sync prefix!(t, *); t_par = toc() #Caution: race condition bug #4330
@printf("Serial: %.3f sec  Parallel: %.3f sec  speedup: %.3fx (theory=1.4x)", t_ser, t_par, t_ser/t_par)


elapsed time: 0.13184095 seconds (33593504 bytes allocated)
elapsed time: 15.151019988 seconds
elapsed time: 11.768449118 seconds
Serial: 15.151 sec  Parallel: 11.768 sec  speedup: 1.287x (theory=1.4x)

The exact same serial code now runs in parallel thanks to multiple dispatch!


In [18]:
#Which method is used with these arguments?
@show typeof(s[1])
which(*, s[1:2]...)
@show typeof(t[1])
which(*, t[1:2]...)


typeof(s[1]) => Array{Float64,2}
*{T<:Union(Float32,Float64,Complex{Float64},Complex{Float32})}(A::Union(SubArray{T<:Union(Float32,Float64,Complex{Float64},Complex{Float32}),2,A<:DenseArray{T,N},I<:(Union(Range{Int64},Int64,Range1{Int64})...,)},DenseArray{T<:Union(Float32,Float64,Complex{Float64},Complex{Float32}),2}),B::Union(SubArray{T<:Union(Float32,Float64,Complex{Float64},Complex{Float32}),2,A<:DenseArray{T,N},I<:(Union(Range{Int64},Int64,Range1{Int64})...,)},DenseArray{T<:Union(Float32,Float64,Complex{Float64},Complex{Float32}),2})) at linalg/matmul.jl:92
typeof(t[1]) => RemoteRef
*(r1::RemoteRef,r2::RemoteRef) at In[12]:18

In [ ]: